Платформа данных в Леруа Мерлен. Part 2. Обновления 2021 года: Flink и Superset

Итак, в 2021 год мы вступили со следующей архитектурой:

У нас есть DWH, в который мы различными путями укладываем CDC поток с большого количества источников, который обрабатываем с помощью процедур, запускаемых через Airflow и формируем DDS и витрины. Также у нас есть DataLake на S3, в котором лежит сырьё.

Мы добавили возможность работать с CDC нереляционных баз, таких как Mongo (тоже с помощью Debezium выгружали их в Kafka), начали обрабатывать канонические объекты – это, по сути, такие структурированные данные по строго принятым в компании схемам. Также мы добавили возможность работать с event streams, такими как clickstreams с онлайн-площадок, или эвентами CDP (customer data platform). После этого мы собирали их в формат parquet с помощью NiFi и выгружали в Yandex Object Storage, после этого с помощью Spark парсили и загружали в GreenPlum (через PXF). 

Точки роста

Двигателем нашей платформы являются наши пользователи: дата-инженеры, аналитики и дата-сайентисты. Без них, вся наша польза для бизнеса имела бы отрицательную стоимость. Поэтому, в первую очередь, мы собрали от них фидбек о нашей работе.

Пользовательский опыт

  • Оперативные отчеты

Самая частая просьба пользователей – дать им возможность строить отчеты на основе оперативных данных. Данных за прошедший день уже было недостаточно для адекватной оценки бизнес-процессов, появился запрос на показатели, близкие к реальному времени.

  • Сложность интеграции

Боль пользователей была в необходимости создавать CDC на стороне источников, проливать данные в Kafka, коммитить в CI DDL GreenPlum’а, рисовать в NiFi ETL процессы. При этом, некоторые пользователи не умели даже работать с GIT’ом, не говоря уже о разработке в локальном докере, запуске тестов в пайплайнах, DAG’ах Airflow и прочем.

  • Время обработки запросов

«Вчера мой SQL запрос обрабатывался 30 секунд, а сегодня уже 10 минут» – с такими словами начали приходить пользователи. Причина была простая – несмотря на то что у GreenPlum есть разграничения ресурсов по ресурсным группам и очередям, пользовательские запросы все равно замедлялись под большой нагрузкой, когда на кластере работали тяжелые ETL-процессы.

  • Прямые запросы (DirectQuery)

Пользователи хотели получить возможность подключать свои BI-инструменты в режиме Direct Query. По сути, каждый дашборд может генерить свои запросы к источнику данных, а мы на GreenPlum ограничиваем количество подключений каждой учетки. В итоге их дашборды просто не прогружались. Можно было объяснять пользователям специфику работы GreenPlum, говорить, что это аналитическое хранилище, которое работает определенным образом. Но мы сами понимали, что платформе нужно развитие.

Административный опыт

Но не только у пользователей возникали проблемы — нам как владельцам платформы тоже было важно экономить свое время, находить точки роста и оптимизировать нашу работу.

  • Ресурсы

У нас получилось достаточно ресурсоемкое решение – кластеру порой приходилось обрабатывать порядка 150 млн CDC-событий за запуск процедуры, а это занимало иногда до 4 часов. Время деградировало из-за накопленной истории. Партиционирование, конечно, помогало до определенного момента, когда количество тяжелых источников в платформе не стало исчисляться несколькими десятками. 

Также из-за того, что пользователи могли ходить в ODS-слои и рассчитывать на них дашборды, нагрузка на кластер все больше и больше росла. Решением могло стать создание большого количества витрин. Но мы за подход Data mesh во всем бизнесе, мы не хотим становиться единым центром компетенций, а хотим, чтобы вся компания трансформировалась и развивалась в направлении работы с данными. Но при этом свободных рук дата-инженеров во всех направлениях (доменах) компании постоянно не хватало. 

  • Контроль

Пользователи наши друзья, и мы стараемся им во всем помогать. Но, не имея должного опыта работы с GreenPlum, не зная его специфики, невозможно написать оптимальный запрос. Поэтому в кластере бежало большое количество кривых запросов, нам же было тяжело их отлавливать, так как в минуту пробегало порядка 2 тысяч запросов. Плюс каждый пользователь подключался со своей локальной машины – кто из DBeaver, кто из PGAdmin, а кто своими питоновскими скриптами. 

  • Платформенность

По нашим наблюдениям, GreenPlum в компании стал синонимом дата платформы. Если не работает он – ничего не работает: отчетность, дашборды, построенные на данных сервисы. Мы поняли, что нам нужно развивать наше решение и эволюционировать, не закупая новое железо под GreenPlum или заказывая виртуалки, а оптимизируя процессы загрузки и выгрузки данных, упрощая процедуры процессинга и работы с данными. 

В итоге мы решили сфокусироваться на создании оперативной отчетности и контроле за действиями пользователей. Эти задачи мы решили с помощью 2х новых компонентов архитектуры.

Апдейты 2021

Операционная аналитика на Flink

В начале, чтобы удовлетворить самую большую потребность пользователей (в операционных данных), мы решили создать новый сервис, который назвали «Операционная аналитика». Построили мы его на базе Flink (фреймворк потоковой обработки данных). Начиная с версии 1.11 с июля 2020 года в нем появился функционал для работы с CDC потоками, как-раз генерируемых Debezium. 

Также в версии 1.12 у Flink появилась возможность работать с CDC+Avro и со схемами, хранящимися в Confluent Schema Registry. 

В результате выполнения простых запросов на потоке можно получать постоянно обновляющиеся counter’ы. Мы проверили этот формат работы, он показал прекрасные результаты, и то, что на GreenPlum считалось часами, мы могли уже считать на потоке с помощью Flink, еще и в режиме near-realtime. Кейс этот очень нужный, так как, например, показатели по товарообороту бизнес интересуют с минимальной задержкой. 

При этом не все было так гладко. С какими трудностями мы столкнулись при работе Flink с построением операционной аналитики? Минусы операционной аналитики на Flink:

  • Один источник – один топик

У нас в Kafka данные хранятся таким образом: один источник – один топик. То есть по большому количеству таблиц в одном источнике используется один топик. И если нам необходимо в Flink посчитать данные только на одной таблице для одного источника, нам приходится читать весь поток по источнику.  

  • Нет метаданных по Debezium

Следствие предыдущего минуса – в текущей версии Flink невозможно получить метаданные из полей Debezium. То есть невозможно сейчас точно определить имя таблицы, считав имя сообщения из Kafka CDC потока. Ждем фикс.

  • Ограничения генерации схем

Если вы работаете с данными в avro, но не используете schema registry, а храните схему в заголовках сообщений, на текущий момент Flink не может генерировать схему на основе этого заголовка – необходимо в таком случае схему задавать заранее. 

Несмотря на эти недостатки, мы выявили для себя много плюсов операционной аналитики на Flink:

  • Near-realtime данные на CDC потоках

Скорость позволяет давать возможность пользователям производить обработку данных в режиме близкому к реальному времени.

  • Поддержка SQL

Пользователи могут считать каунтеры, написав запросы на FlinkSQL – им не нужно изучать ни Scala, ни Java, никакие другие языки программирования, кроме SQL.

  • Стабильность

Flink – решение, которое существует на рынке давно. Возможность его кластеризации и контейнеризации позволяют нам строить отказоустойчивые HA-решения при правильном использовании savepoint’ов и checkpoint’ов. 

  • Настоящий стриминг

В отличии от Спарка – у флика т.н. “true”-streaming, а не микро батчи, что как раз и позволяет ему работать с потоками CDC.

  • Поддержка Avro Confluent schema registry 

Из коробки можно подключится к Confluent Schema Registry и забирать схемы сообщений из него:

FlinkSQL
CREATE TABLE test1(    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,    `lSequenceNumber` INT,    `Date` STRING,    `RequestType` STRING,    `TransactionDate` STRING,    `TransactionTime` STRING,    `Amount` DECIMAL(10, 4)  )  WITH (    'connector' = 'kafka',    'topic' = 'data.init.database.avro.test',    'properties.bootstrap.servers' = kafka0:9092,kafka1:9092,kafka2:9092, kafka3:9092,kafka4:9092',    'properties.group.id' = 'flink-group-test1',    'properties.security.protocol' = 'SASL_SSL',    'properties.sasl.mechanism' = 'SCRAM-SHA-256',    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="***" password="***";',    'properties.ssl.truststore.location' = '/home/client.truststore.jks',    'properties.ssl.truststore.password' = '***',    'scan.startup.mode' = 'earliest-offset',    'value.format' = 'debezium-avro-confluent',    'value.debezium-avro-confluent.schema-registry.url' = 'sr0:8081'   );
  • Удобный и подробный мониторинг

Для отслеживания состояния джоб флинк предоставляет подробный интерфейс мониторинга (кмк более простой в освоении, чем интерфейс спарка):

UI monitoring
  • Большое число коннекторов.

За время существования Flink’а, для него было написано множество source и sink коннекторов, что позволяет реализовывать в своих ETL процессах интеграцию со множеством решений. Как пример можем привести clickhouse-sink коннектор, от коллег из ivi, который позволяет укладывать данные напрямую в CH.

Сравнивать Spark с Flink в контексте нашей задачи бессмысленно, т.к. killer-фичей для нас оказалась возможность работать с CDC потоками, чего не умеет делать спарк из коробки.

В итоге на данный момент мы имеем следующую структуру для near-realtime аналитики:

ETL для оперативных данных мы заменяем на Flink, развернув его в Kubernetes, данные по операционной аналитике мы загружаем в Yandex Object Storage с CH над ним.

Пользовательский опыт на Apache Superset

Мы внедрили Apache Superset – быстрый, легкий, интуитивно понятный веб-интерфейс для работы с данными и написания SQL-запросов. Он позволяет писать эти запросы к огромному количеству источников и визуализировать их в графиках и дашбордах.

Когда мы развернули его, мы нашли несколько недостатков:

  • Кодировка UTF-8

С кодировкой UTF-8 русские символы неправильно кодировались при выгрузке в CSV, и отображались т.н. краказябры. В текущей версии (1.2.0) это уже исправлено.

  • Н